package com.ruoyi.framework.websocket;

import com.alibaba.fastjson2.JSONArray;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.ruoyi.common.core.redis.RedisCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;


/**
 * WebSocket服务类
 *
 * @author csf
 * @date 2020年8月10日
 */
@Component
@ServerEndpoint("/websocket/message/{userId}")
public class WebSocketServer {

    private static final String unreadMsgKey = "unread_msg_%s";

    Logger log = LoggerFactory.getLogger(this.getClass());

    // 存放每个用户对应的WebSocket连接对象,key为userId，value为Set<Session>
    // 一个登录用户只建立一个连接，同一个用户多处登录，保存为HashSet
    private static Map<Long, Set<Session>> webSocketSessionMap = new ConcurrentHashMap<Long, Set<Session>>();

    /**
     * 默认最多允许同时在线人数100
     */
    public static int socketMaxOnlineCount = 1000;

    private static Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount);

    /**
     * 连接建立成功调用的方法
     *
     * @param session 连接会话，由框架创建
     * @param userId  用户id， 为处理用户多点登录都能收到消息
     * @author csf
     * @date 2020年8月10日
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") Long userId) throws IOException {
        Set<Session> sessions = webSocketSessionMap.get(userId);
        if (CollectionUtils.isEmpty(sessions) || !webSocketSessionMap.containsKey(userId)) {
            sessions = new HashSet<Session>();
        }
        if (!sessions.contains(session)) {
            boolean semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore);
            if (!semaphoreFlag) {
                // 未获取到信号量
                log.error("当前在线人数超过限制数- {}", socketMaxOnlineCount);
                session.getBasicRemote().sendText("当前在线人数超过限制数：" + socketMaxOnlineCount);
                session.close();
                return;
            }
            sessions.add(session);
            webSocketSessionMap.put(userId, sessions);
            log.info("有新连接[{}]接入,当前websocket连接数为:{}", userId, getOnlineCount());
        }
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose(@PathParam("userId") Long userId, Session session) {
        if (webSocketSessionMap.containsKey(userId)) {
            try {
                Set<Session> sessions = webSocketSessionMap.get(userId);
                sessions.remove(session);
                session.close();
                SemaphoreUtils.release(socketSemaphore);
                if (CollectionUtils.isEmpty(sessions)) {
                    webSocketSessionMap.remove(userId);
                }
            } catch (IOException e) {
                log.error("连接[{}]关闭失败。", userId);
                e.printStackTrace();
            }
        }
    }

    /**
     * 接收客户端发送的消息
     *
     * @param message 客户端发送过来的消息
     * @param session websocket会话
     */
    @OnMessage
    public void onMessage(@PathParam("userId") Long userId, String message, Session session) {
        log.info("收到来自客户端用户：" + userId + "的信息:" + message);
    }

    /**
     * 连接错误时触发
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(@PathParam("userId") Long userId, Session session, Throwable error) {
        log.error("socket错误，{}", error.getMessage(), error);
        try {
            Set<Session> sessions = webSocketSessionMap.get(userId);
            sessions.remove(session);
            session.close();
            SemaphoreUtils.release(socketSemaphore);
            if (CollectionUtils.isEmpty(sessions)) {
                webSocketSessionMap.remove(userId);
            }
        } catch (IOException e) {
            log.error("连接[{}]关闭失败。", e.getMessage(), e);
        }
    }

    /**
     * 给指定的客户端推送消息，可单发和群发
     *
     * @param userId  发送消息给目标客户端key，多个逗号“,”隔开1234,2345...
     * @param message
     * @throws IOException
     * @author csf
     * @date 2020年8月11日
     */
    public void sendMessage(Long userId, String message) {
        if (Objects.nonNull(userId)) {
            try {
                Set<Session> sessions = webSocketSessionMap.get(userId);
                if (CollectionUtils.isEmpty(sessions)) {
                    log.info("没有id为{}的目标客户端", userId);
                    return;
                }
                for (Session sess : sessions) {
                    sess.getBasicRemote().sendText(message);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        } else {
            log.info("userId为空");
        }
    }

    public static synchronized int getOnlineCount() {
        return socketMaxOnlineCount - socketSemaphore.availablePermits();
    }

}